package com.example.dobs.demo.flink.realtime.report.study.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import com.example.dobs.demo.flink.realtime.report.study.flink.bean.Event;
import com.example.dobs.demo.flink.realtime.report.study.flink.bean.EventEnum;

import java.util.Properties;

public class KafkaTest {
    public static void main(String[] args) throws Exception {

        int timeToLiveMinutes = 5;//30分钟
        int kafkaParallelism = 4;//线上4，测试2
        int sinkKafkaParallelism = 2;//线上4，测试2
        Configuration conf = new Configuration();
        //set global config
        conf.setInteger("time-to-live-minutes", timeToLiveMinutes);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//时间类型
        env.getConfig().setAutoWatermarkInterval(5000);//
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "prod.metis-ads.cbs.sg1.kafka:9092");
//        kafkaProps.setProperty("group.id", "metis_ads_adcs_cold_adx");
//        kafkaProps.setProperty("enable.auto.commit", "false");
        String topic = "metis_ads_adcs_cold_adx";
        Properties sinkKafkaProps = new Properties();
        sinkKafkaProps.setProperty("bootstrap.servers", "test.test0912.adx.ads.sg1.mq:9092");
//        sinkKafkaProps.setProperty("group.id", "ads_adx_grtest0912");
//        sinkKafkaProps.setProperty("enable.auto.commit", "false");
        String sinkTopic = "ads_adx_test0912";

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProps);

        SingleOutputStreamOperator<String> dataStream = env.addSource(consumer).setParallelism(kafkaParallelism).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                JSONObject eventJson;
                try {//解析json,无法解析时,跳过
                    eventJson = JSON.parseObject(JSON.parseObject(value).getString("message").split("\t")[0]);
                    eventJson.getJSONObject("params");
                } catch (Exception e) {
                    return false;
                }

                return true;
            }
        }).setParallelism(kafkaParallelism)
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String value, Collector<String> out) throws Exception {
                        JSONObject eventJson = JSON.parseObject(JSON.parseObject(value).getString("message").split("\t")[0]);
                        String event_name = eventJson.getString("event_name");
                        if (EventEnum.Request.getName().equals(event_name)) {
                            requestLogParser(eventJson, out);
                        } else if (EventEnum.Show.getName().equals(event_name)) {
                            impressionLogParser(eventJson, out);
                        } else if (EventEnum.Click.getName().equals(event_name)) {
                            impressionLogParser(eventJson, out);
                        }  //pass
                    }
                }).setParallelism(kafkaParallelism)
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2())
                .setParallelism(kafkaParallelism).name("adx_report_water_mark");//水位需要设置吗？
        //将输入流从1个拆分成2个，分别为req请求，imp曝光点击
        OutputTag<Tuple2<String, String>> reqTag = new OutputTag<Tuple2<String, String>>("request") {
        };
        OutputTag<Tuple2<String, String>> impTag = new OutputTag<Tuple2<String, String>>("impression") {
        };
        SingleOutputStreamOperator<Tuple2<String, String>> mainDataStream = dataStream.process(new ProcessFunction<String, Tuple2<String, String>>() {
            @Override
            public void processElement(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                //streamSource拆分成2个流
                if (value.startsWith(EventEnum.Request.getName())) {
                    ctx.output(reqTag, new Tuple2<String, String>("request", value));
                } else {
                    //debug pass
//                    System.out.println(value.toString());
                    ctx.output(impTag, new Tuple2<String, String>("impression", value));
                }
            }
        }).setParallelism(kafkaParallelism).name("adx_split_request_impression");
        DataStream<Tuple2<String, String>> reqSideOutput = mainDataStream.getSideOutput(reqTag).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
        DataStream<Tuple2<String, String>> impSideOutput = mainDataStream.getSideOutput(impTag).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
        // impSideOutput.print();  //debug pass
        //
        SingleOutputStreamOperator<Tuple2<String, String>> joinedDataStreamOperator = impSideOutput.union(reqSideOutput).keyBy(new KeySelector<Tuple2<String, String>, String>() {
            @Override
            public String getKey(Tuple2<String, String> value) throws Exception {
                Event event = new Event();
                event.readByLine(value.f1);
                return event.rid + event.pid;
            }
        })
                .process(new RealtimeReportProcessFunction())
                .name("adx_report_join")
                .setParallelism(kafkaParallelism);//kafka并行度。测试1，线上4

        // 统计结果
        SingleOutputStreamOperator<String> outputStream = joinedDataStreamOperator.keyBy(new KeySelector<Tuple2<String, String>, String>() {
            @Override
            public String getKey(Tuple2<String, String> value) throws Exception {
                Event event = new Event().readByLine(value.f1);

                return event.getTableDimension();
            }
        }).timeWindow(Time.minutes(timeToLiveMinutes))
                .process(new ProcessWindowFunction<Tuple2<String, String>, String, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<Tuple2<String, String>> elements, Collector<String> out) throws Exception {
                        float price = 0; //出价（从曝光日志计算），
                        long req = 0;    //请求1次记为1，否则0
                        long imp = 0;    //曝光1次为记1，否则0
                        long click = 0;  //点击1次为记1，否则0
                        long bid = 0;  //竞价1次记为1。条件：sts=1，即广告可以正常返回
                        long partBid = 0;  //参数竞价1次为1。条件：sts!=2且sts!=8。注意素材不合格时，也算参与竞
                        long bidWon = 0; //竞价或请求胜出1次，记为1。条件：sts=1，且dsp胜出
                        long requestWon = 0; //等价于bidWon
                        String timestamp = "0001000100010"; //时间戳13位
                        Event resultEvent = null;
                        for (Tuple2<String, String> value : elements) {
                            Event event = new Event().readByLine(value.f1);
                            if (resultEvent == null) {
                                resultEvent = new Event().readByLine(value.f1);
                            }
                            price += Float.parseFloat(event.price);
                            req += Long.parseLong(event.req);
                            imp += Long.parseLong(event.imp);
                            click += Long.parseLong(event.click);
                            bid += Long.parseLong(event.bid);
                            partBid += Long.parseLong(event.partBid);
                            bidWon += Long.parseLong(event.bidWon);
                            requestWon += Long.parseLong(event.requestWon);
                            timestamp = event.timestamp.compareTo(timestamp) > 0 ? event.timestamp : timestamp;
                        }
                        if (resultEvent != null) {
                            resultEvent.price = String.valueOf(price);
                            resultEvent.req = String.valueOf(req);
                            resultEvent.imp = String.valueOf(imp);
                            resultEvent.click = String.valueOf(click);
                            resultEvent.bid = String.valueOf(bid);
                            resultEvent.partBid = String.valueOf(partBid);
                            resultEvent.bidWon = String.valueOf(bidWon);
                            resultEvent.requestWon = String.valueOf(requestWon);
                            resultEvent.timestamp = timestamp;
                            out.collect(resultEvent.toJsonString());
                        }

                    }
                }).name("adx_report_static").setParallelism(sinkKafkaParallelism);
        //输出
        outputStream.addSink(new FlinkKafkaProducer<String>(sinkTopic, new SimpleStringSchema(), sinkKafkaProps)).name("adx_report_sink").setParallelism(sinkKafkaParallelism);
        env.execute();
    }

    /**
     * 解析曝光和点击日志
     *
     * @param eventJson 曝光或点击日志。json格式
     * @param out       输出集合
     */
    private static void impressionLogParser(JSONObject eventJson, Collector<String> out) {
        String event_name;
        String rid;
        String pid;
        int imp = 0;
        int click = 0;
        String timestamp;
        event_name = eventJson.getString("event_name");
        rid = eventJson.getString("rid");
        timestamp = eventJson.getString("tm");
        JSONObject params = eventJson.getJSONObject("params");
        pid = params.getString("pid");
        if (EventEnum.Show.getName().equals(event_name)) {
            imp = 1;
        } else {
            click = 1;
        }
        Event event = new Event();
        event.eventName = event_name;
        event.rid = rid;
        event.pid = pid;
        event.imp = String.valueOf(imp);
        event.click = String.valueOf(click);
        event.timestamp = timestamp;
        out.collect(event.toString());
    }

    /**
     * 解析request日志
     * 输出内容包括：
     * (1) event_name
     * (2) rid
     * (3) pkg/product
     * (4) v_code/version_code
     * (5) pid
     * (6) nation
     * (7) dsp（请求）
     * (8) price（曝光）
     * (9) req (请求量)
     * (10) req_won（同bid_won）
     * (11) imp（曝光量）
     * (12) click（点击量）
     * (13) bid（有效竞价）
     * (14) parti_bid（参与竞价）
     * (15) bid_won（从竞价中胜出）
     * (16) pid_zh（中文名,需要匹配字典）
     * (17) adf_id（素材）
     * (17) tm（事件时间）
     *
     * @param eventJson 请求日志。json格式
     * @param out       输出集合
     */
    private static void requestLogParser(JSONObject eventJson, Collector<String> out) {
        String event_name;
        String rid;
        String product;
        String version_code;
        String pid;
        String nation;
        //
        String dsp;
        String adfId;
        String price;
        int req = 1;
        int req_won = 0;
        int bid = 0;
        int parti_bid = 0;
        int bid_won = 0;
        String timestamp;
        //
        event_name = eventJson.getString("event_name");
        rid = eventJson.getString("rid");
        version_code = eventJson.getString("v_code");
        timestamp = eventJson.getString("tm");

        JSONObject params = eventJson.getJSONObject("params");
        product = params.getString("pkg");
        pid = params.getString("pid");
        nation = params.getString("nation");
        /*
        resultDsp为空的情况，如下：
        1）没有request;
        2）有request，且dsp的sts都不为1
        */
        String resultDsp = params.getString("dspName");

        JSONArray requestDetail = params.getJSONArray("r_dtl");
        if (requestDetail == null) return;
        for (Object obj : requestDetail) {
            JSONObject jsonObject = (JSONObject) obj;
            dsp = jsonObject.getString("dname");
            if (dsp == null) {//dsp不存在则，丢弃该数据
                continue;
            }
            String sts = jsonObject.getString("sts");
            parti_bid = ("2".equals(sts) || "8".equals(sts)) ? 1 : 0;
            bid = "1".equals(sts) ? 1 : 0;
            bid_won = dsp.equals(resultDsp) ? 1 : 0;//bid success
            req_won = bid_won;

            adfId = jsonObject.getString("adf_id");
            price = jsonObject.getString("pr");
            if (price == null) {
                price = "0";
            }
            // bid
            Event event = new Event();
            event.eventName = (event_name);
            event.rid = rid;
            event.product = product;
            event.vCode = String.valueOf(version_code);
            event.pid = pid;
            event.nation = nation;
            event.dsp = dsp;
            event.price = String.valueOf(price);//希望这个没有用
            event.req = String.valueOf(req);
            event.requestWon = String.valueOf(req_won);
            event.bid = String.valueOf(bid);
            event.partBid = String.valueOf(parti_bid);
            event.bidWon = String.valueOf(bid_won);
            event.adfId = String.valueOf(adfId);
            event.timestamp = timestamp;
            out.collect(event.toString());
        }
    }


}
